package org.databandtech.flinkstreaming;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.databandtech.flinkstreaming.Entity.EpgVod;
import org.databandtech.flinkstreaming.Entity.ItemCount;

/**
 * cd /usr/app/flink-1.13.2 启动 ./bin/start-cluster.sh bin/flink run --class
 * org.databandtech.flinkstreaming.App /usr/app/flink-1.13.2/examples/rt.jar
 */
public class KafkaConsumerApp {

	static String OUTPUTPATH = "D:\\sink";

	public static void main(String[] args) {

		// 创建Flink执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        
        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop001:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "Hello-Kafka";
        String outputTopicSink = "Hello-Kafka-out";
        
        //Source
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
        
        //设置检查点
        //env.enableCheckpointing(5000); // checkpoint every 5000 msecs
        //设置活动时间
        //In Flink 1.12 the default stream time characteristic has been changed to TimeCharacteristic.EventTime, thus you don't need to call this method for enabling event-time support anymore. 
        //env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//处理时间：处理时间是指执行相应 算子操作的机器的系统时间
        //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//摄取时间是事件进入Flink的时间。
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//事件时间是每个事件在其生产设备上发生的时间。
        
        //用于控制消费offset
        consumer.setStartFromEarliest();
        //consumer.setStartFromEarliest();     // start from the earliest record possible
        //consumer.setStartFromLatest();       // start from the latest record
        //consumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
        //consumer.setStartFromGroupOffsets(); // the default behaviour
        //用于控制特殊消费Offset,Partition
        //Map<KafkaTopicPartition, LonPartitiong> specificStartOffsets = new HashMap<>();
        //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
        //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
        //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
        //myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
        
        //sink 1
        final StreamingFileSink<Tuple3<String,String,Integer>> sink = StreamingFileSink
        	    .forRowFormat(new Path(OUTPUTPATH), new SimpleStringEncoder<Tuple3<String,String,Integer>>("UTF-8"))
        	    .withRollingPolicy(
        	        DefaultRollingPolicy.builder()
        	            .withRolloverInterval(TimeUnit.MINUTES.toMillis(60000))
        	            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5000))
        	            .withMaxPartSize(1024 * 1024 * 1024)
        	            .build())
        		.build();
        
        //sink 2
        final StreamingFileSink<ItemCount> sink2 = StreamingFileSink
        	    .forRowFormat(new Path(OUTPUTPATH), new SimpleStringEncoder<ItemCount>("UTF-8"))
        	    .withRollingPolicy(
        	        DefaultRollingPolicy.builder()
        	            .withRolloverInterval(TimeUnit.MINUTES.toMillis(60000))
        	            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5000))
        	            .withMaxPartSize(1024 * 1024 * 1024)
        	            .build())
        		.build();
        
        //sink 3
        final StreamingFileSink<List<ItemCount>> sink3 = StreamingFileSink
        	    .forRowFormat(new Path(OUTPUTPATH), new SimpleStringEncoder<List<ItemCount>>("UTF-8"))
        	    .withRollingPolicy(
        	        DefaultRollingPolicy.builder()
        	            .withRolloverInterval(TimeUnit.MINUTES.toMillis(60000))
        	            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5000))
        	            .withMaxPartSize(1024 * 1024 * 1024)
        	            .build())
        		.build();
        
        //redis sink
        //url: https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
        
        //kafka sink
        //url: https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/kafka/

        
        DataStream<String> stream = env.addSource(consumer);
        //stream.print(); //仅打印输入
        
        //类型转换，作为可重用的数据源
        DataStream<EpgVod> objStream = stream.map(new KafkaConsumerMapper());        
        objStream.print(); //仅打印输入
      
        //在线人数统计1，翻滚窗口，直接执行ProcessWindowFunction
//        DataStream<Tuple3<String,String,Integer>> onlineOut = 
//        		objStream
//        		.filter( item -> item.getSys_id().equals("m") || 
//        				item.getSys_id().equals("u") || item.getSys_id().equals("t")) //执行过滤
//        		.keyBy(obj -> obj.getSys_id()+"-"+obj.getAction_type()) //分区
//        		.window(TumblingProcessingTimeWindows.of(Time.minutes(1L))) //翻滚窗口
//        		.process(new KafkaOnlineCountFuntion());  //直接执行process
        
//        onlineOut.print(); 
        
        //在线人数统计2，滑动或滚动窗口，预聚合后执行窗口函数
        DataStream<ItemCount> onlineOut2 = 
        		objStream
        		.filter( item -> item.getSys_id().equals("m") || 
        				item.getSys_id().equals("u") || item.getSys_id().equals("t")) //执行过滤
        		.assignTimestampsAndWatermarks(WatermarkStrategy.<EpgVod>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        				.withTimestampAssigner(new KafkaTimestampAssigner())) // 定义水位和时间戳提取类实例
        		.keyBy(obj -> obj.getSys_id()+"-"+obj.getAction_type()) //分区
        		.window(TumblingProcessingTimeWindows.of(Time.minutes(1L))) //翻滚窗口
        		//.window(SlidingEventTimeWindows.of(Time.minutes(2L), Time.minutes(1L))) //滑动窗口
        		.aggregate(new KafkaAggFunction(), new KafkaWindowFunction("OnlineCount"));//预聚合 + 窗口行数
        
        onlineOut2.print(); 
       // onlineOut2.addSink(sink2);       
        
        DataStream<List<ItemCount>> onlineOutTopN = onlineOut2
        		.keyBy(obj -> obj.getWindowEnd())
        		.process(new KafkaTopNKeyedFunction(3)); // TopN排序,有状态处理
        
        onlineOutTopN.print(); 
        onlineOutTopN.addSink(sink3);       
      
        //区域人数topN统计
        DataStream<ItemCount> areaOut = 
        		objStream.keyBy(obj -> obj.getSys_id()+"-"+obj.getArea_code())
        		.window(TumblingProcessingTimeWindows.of(Time.minutes(1L)))
        		//.process(new KafkaAreaFuntion());
        		.aggregate(new KafkaAggFunction(), new KafkaWindowFunction("AreaCount"));//预聚合 + 窗口行数
  
        areaOut.print(); 
        areaOut.addSink(sink2);
        
        try {
			env.execute("kafkaTest");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
